#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# RDD in R implemented in S4 OO system.

setOldClass("jobj")

# @title S4 class that represents an RDD
# @description RDD can be created using functions like
#              \code{parallelize}, \code{textFile} etc.
# @rdname RDD
# @seealso parallelize, textFile
#
# @slot env An R environment that stores bookkeeping states of the RDD
# @slot jrdd Java object reference to the backing JavaRDD
# to an RDD
# @export
setClass("RDD",
         slots = list(env = "environment",
                      jrdd = "jobj"))

setClass("PipelinedRDD",
         slots = list(prev = "RDD",
                      func = "function",
                      prev_jrdd = "jobj"),
         contains = "RDD")

setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
                                        isCached, isCheckpointed) {
  # Check that RDD constructor is using the correct version of serializedMode
  stopifnot(class(serializedMode) == "character")
  stopifnot(serializedMode %in% c("byte", "string", "row"))
  # RDD has three serialization types:
  # byte: The RDD stores data serialized in R.
  # string: The RDD stores data as strings.
  # row: The RDD stores the serialized rows of a DataFrame.

  # We use an environment to store mutable states inside an RDD object.
  # Note that R's call-by-value semantics makes modifying slots inside an
  # object (passed as an argument into a function, such as cache()) difficult:
  # i.e. one needs to make a copy of the RDD object and sets the new slot value
  # there.

  # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
  # inherited from RDD, but only the former is used.
  .Object@env <- new.env()
  .Object@env$isCached <- isCached
  .Object@env$isCheckpointed <- isCheckpointed
  .Object@env$serializedMode <- serializedMode

  .Object@jrdd <- jrdd
  .Object
})

setMethod("show", "RDD",
          function(object) {
              cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
          })

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
  .Object@env <- new.env()
  .Object@env$isCached <- FALSE
  .Object@env$isCheckpointed <- FALSE
  .Object@env$jrdd_val <- jrdd_val
  if (!is.null(jrdd_val)) {
    # This tracks the serialization mode for jrdd_val
    .Object@env$serializedMode <- prev@env$serializedMode
  }

  .Object@prev <- prev

  isPipelinable <- function(rdd) {
    e <- rdd@env
    # nolint start
    !(e$isCached || e$isCheckpointed)
    # nolint end
  }

  if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
    # This transformation is the first in its stage:
    .Object@func <- cleanClosure(func)
    .Object@prev_jrdd <- getJRDD(prev)
    .Object@env$prev_serializedMode <- prev@env$serializedMode
    # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
    # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
  } else {
    pipelinedFunc <- function(partIndex, part) {
      f <- prev@func
      func(partIndex, f(partIndex, part))
    }
    .Object@func <- cleanClosure(pipelinedFunc)
    .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
    # Get the serialization mode of the parent RDD
    .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
  }

  .Object
})

# @rdname RDD
# @export
#
# @param jrdd Java object reference to the backing JavaRDD
# @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
# stores strings, and "row" if the RDD stores the rows of a DataFrame
# @param isCached TRUE if the RDD is cached
# @param isCheckpointed TRUE if the RDD has been checkpointed
RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
                isCheckpointed = FALSE) {
  new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
}

PipelinedRDD <- function(prev, func) {
  new("PipelinedRDD", prev, func, NULL)
}

# Return the serialization mode for an RDD.
setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
# For normal RDDs we can directly read the serializedMode
setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode )
# For pipelined RDDs if jrdd_val is set then serializedMode should exist
# if not we return the defaultSerialization mode of "byte" as we don't know the serialization
# mode at this point in time.
setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
          function(rdd) {
            if (!is.null(rdd@env$jrdd_val)) {
              return(rdd@env$serializedMode)
            } else {
              return("byte")
            }
          })

# The jrdd accessor function.
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
          function(rdd, serializedMode = "byte") {
            if (!is.null(rdd@env$jrdd_val)) {
              return(rdd@env$jrdd_val)
            }

            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
                                         connection = NULL)

            broadcastArr <- lapply(ls(.broadcastNames),
                                   function(name) { get(name, .broadcastNames) })

            serializedFuncArr <- serialize(rdd@func, connection = NULL)

            prev_jrdd <- rdd@prev_jrdd

            if (serializedMode == "string") {
              rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
                                   callJMethod(prev_jrdd, "rdd"),
                                   serializedFuncArr,
                                   rdd@env$prev_serializedMode,
                                   packageNamesArr,
                                   broadcastArr,
                                   callJMethod(prev_jrdd, "classTag"))
            } else {
              rddRef <- newJObject("org.apache.spark.api.r.RRDD",
                                   callJMethod(prev_jrdd, "rdd"),
                                   serializedFuncArr,
                                   rdd@env$prev_serializedMode,
                                   serializedMode,
                                   packageNamesArr,
                                   broadcastArr,
                                   callJMethod(prev_jrdd, "classTag"))
            }
            # Save the serialization flag after we create a RRDD
            rdd@env$serializedMode <- serializedMode
            rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
            rdd@env$jrdd_val
          })

setValidity("RDD",
            function(object) {
              jrdd <- getJRDD(object)
              cls <- callJMethod(jrdd, "getClass")
              className <- callJMethod(cls, "getName")
              if (grep("spark.api.java.*RDD*", className) == 1) {
                TRUE
              } else {
                paste("Invalid RDD class ", className)
              }
            })


############ Actions and Transformations ############

# Persist an RDD
#
# Persist this RDD with the default storage level (MEMORY_ONLY).
#
# @param x The RDD to cache
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2L)
# cache(rdd)
#}
# @rdname cache-methods
# @aliases cache,RDD-method
setMethod("cache",
          signature(x = "RDD"),
          function(x) {
            callJMethod(getJRDD(x), "cache")
            x@env$isCached <- TRUE
            x
          })

# Persist an RDD
#
# Persist this RDD with the specified storage level. For details of the
# supported storage levels, refer to
# http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
#
# @param x The RDD to persist
# @param newLevel The new storage level to be assigned
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2L)
# persist(rdd, "MEMORY_AND_DISK")
#}
# @rdname persist
# @aliases persist,RDD-method
setMethod("persist",
          signature(x = "RDD", newLevel = "character"),
          function(x, newLevel = "MEMORY_ONLY") {
            callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
            x@env$isCached <- TRUE
            x
          })

# Unpersist an RDD
#
# Mark the RDD as non-persistent, and remove all blocks for it from memory and
# disk.
#
# @param x The RDD to unpersist
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2L)
# cache(rdd) # rdd@@env$isCached == TRUE
# unpersist(rdd) # rdd@@env$isCached == FALSE
#}
# @rdname unpersist-methods
# @aliases unpersist,RDD-method
setMethod("unpersist",
          signature(x = "RDD"),
          function(x) {
            callJMethod(getJRDD(x), "unpersist")
            x@env$isCached <- FALSE
            x
          })

# Checkpoint an RDD
#
# Mark this RDD for checkpointing. It will be saved to a file inside the
# checkpoint directory set with setCheckpointDir() and all references to its
# parent RDDs will be removed. This function must be called before any job has
# been executed on this RDD. It is strongly recommended that this RDD is
# persisted in memory, otherwise saving it on a file will require recomputation.
#
# @param x The RDD to checkpoint
# @examples
#\dontrun{
# sc <- sparkR.init()
# setCheckpointDir(sc, "checkpoint")
# rdd <- parallelize(sc, 1:10, 2L)
# checkpoint(rdd)
#}
# @rdname checkpoint-methods
# @aliases checkpoint,RDD-method
setMethod("checkpoint",
          signature(x = "RDD"),
          function(x) {
            jrdd <- getJRDD(x)
            callJMethod(jrdd, "checkpoint")
            x@env$isCheckpointed <- TRUE
            x
          })

# Gets the number of partitions of an RDD
#
# @param x A RDD.
# @return the number of partitions of rdd as an integer.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2L)
# numPartitions(rdd)  # 2L
#}
# @rdname numPartitions
# @aliases numPartitions,RDD-method
setMethod("numPartitions",
          signature(x = "RDD"),
          function(x) {
            jrdd <- getJRDD(x)
            partitions <- callJMethod(jrdd, "partitions")
            callJMethod(partitions, "size")
          })

# Collect elements of an RDD
#
# @description
# \code{collect} returns a list that contains all of the elements in this RDD.
#
# @param x The RDD to collect
# @param ... Other optional arguments to collect
# @param flatten FALSE if the list should not flattened
# @return a list containing elements in the RDD
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 2L)
# collect(rdd) # list from 1 to 10
# collectPartition(rdd, 0L) # list from 1 to 5
#}
# @rdname collect-methods
# @aliases collect,RDD-method
setMethod("collect",
          signature(x = "RDD"),
          function(x, flatten = TRUE) {
            # Assumes a pairwise RDD is backed by a JavaPairRDD.
            collected <- callJMethod(getJRDD(x), "collect")
            convertJListToRList(collected, flatten,
              serializedMode = getSerializedMode(x))
          })


# @description
# \code{collectPartition} returns a list that contains all of the elements
# in the specified partition of the RDD.
# @param partitionId the partition to collect (starts from 0)
# @rdname collect-methods
# @aliases collectPartition,integer,RDD-method
setMethod("collectPartition",
          signature(x = "RDD", partitionId = "integer"),
          function(x, partitionId) {
            jPartitionsList <- callJMethod(getJRDD(x),
                                           "collectPartitions",
                                           as.list(as.integer(partitionId)))

            jList <- jPartitionsList[[1]]
            convertJListToRList(jList, flatten = TRUE,
              serializedMode = getSerializedMode(x))
          })

# @description
# \code{collectAsMap} returns a named list as a map that contains all of the elements
# in a key-value pair RDD.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
# collectAsMap(rdd) # list(`1` = 2, `3` = 4)
#}
# @rdname collect-methods
# @aliases collectAsMap,RDD-method
setMethod("collectAsMap",
          signature(x = "RDD"),
          function(x) {
            pairList <- collect(x)
            map <- new.env()
            lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
            as.list(map)
          })

# Return the number of elements in the RDD.
#
# @param x The RDD to count
# @return number of elements in the RDD.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# count(rdd) # 10
# length(rdd) # Same as count
#}
# @rdname count
# @aliases count,RDD-method
setMethod("count",
          signature(x = "RDD"),
          function(x) {
            countPartition <- function(part) {
              as.integer(length(part))
            }
            valsRDD <- lapplyPartition(x, countPartition)
            vals <- collect(valsRDD)
            sum(as.integer(vals))
          })

# Return the number of elements in the RDD
# @export
# @rdname count
setMethod("length",
          signature(x = "RDD"),
          function(x) {
            count(x)
          })

# Return the count of each unique value in this RDD as a list of
# (value, count) pairs.
#
# Same as countByValue in Spark.
#
# @param x The RDD to count
# @return list of (value, count) pairs, where count is number of each unique
# value in rdd.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, c(1,2,3,2,1))
# countByValue(rdd) # (1,2L), (2,2L), (3,1L)
#}
# @rdname countByValue
# @aliases countByValue,RDD-method
setMethod("countByValue",
          signature(x = "RDD"),
          function(x) {
            ones <- lapply(x, function(item) { list(item, 1L) })
            collect(reduceByKey(ones, `+`, numPartitions(x)))
          })

# Apply a function to all elements
#
# This function creates a new RDD by applying the given transformation to all
# elements of the given RDD
#
# @param X The RDD to apply the transformation.
# @param FUN the transformation to apply on each element
# @return a new RDD created by the transformation.
# @rdname lapply
# @aliases lapply
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
# collect(multiplyByTwo) # 2,4,6...
#}
setMethod("lapply",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            func <- function(partIndex, part) {
              lapply(part, FUN)
            }
            lapplyPartitionsWithIndex(X, func)
          })

# @rdname lapply
# @aliases map,RDD,function-method
setMethod("map",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            lapply(X, FUN)
          })

# Flatten results after apply a function to all elements
#
# This function return a new RDD by first applying a function to all
# elements of this RDD, and then flattening the results.
#
# @param X The RDD to apply the transformation.
# @param FUN the transformation to apply on each element
# @return a new RDD created by the transformation.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
# collect(multiplyByTwo) # 2,20,4,40,6,60...
#}
# @rdname flatMap
# @aliases flatMap,RDD,function-method
setMethod("flatMap",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            partitionFunc <- function(part) {
              unlist(
                lapply(part, FUN),
                recursive = F
              )
            }
            lapplyPartition(X, partitionFunc)
          })

# Apply a function to each partition of an RDD
#
# Return a new RDD by applying a function to each partition of this RDD.
#
# @param X The RDD to apply the transformation.
# @param FUN the transformation to apply on each partition.
# @return a new RDD created by the transformation.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
# collect(partitionSum) # 15, 40
#}
# @rdname lapplyPartition
# @aliases lapplyPartition,RDD,function-method
setMethod("lapplyPartition",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
          })

# mapPartitions is the same as lapplyPartition.
#
# @rdname lapplyPartition
# @aliases mapPartitions,RDD,function-method
setMethod("mapPartitions",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            lapplyPartition(X, FUN)
          })

# Return a new RDD by applying a function to each partition of this RDD, while
# tracking the index of the original partition.
#
# @param X The RDD to apply the transformation.
# @param FUN the transformation to apply on each partition; takes the partition
#        index and a list of elements in the particular partition.
# @return a new RDD created by the transformation.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10, 5L)
# prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#                                          partIndex * Reduce("+", part) })
# collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#}
# @rdname lapplyPartitionsWithIndex
# @aliases lapplyPartitionsWithIndex,RDD,function-method
setMethod("lapplyPartitionsWithIndex",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            PipelinedRDD(X, FUN)
          })

# @rdname lapplyPartitionsWithIndex
# @aliases mapPartitionsWithIndex,RDD,function-method
setMethod("mapPartitionsWithIndex",
          signature(X = "RDD", FUN = "function"),
          function(X, FUN) {
            lapplyPartitionsWithIndex(X, FUN)
          })

# This function returns a new RDD containing only the elements that satisfy
# a predicate (i.e. returning TRUE in a given logical function).
# The same as `filter()' in Spark.
#
# @param x The RDD to be filtered.
# @param f A unary predicate function.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
#}
# @rdname filterRDD
# @aliases filterRDD,RDD,function-method
setMethod("filterRDD",
          signature(x = "RDD", f = "function"),
          function(x, f) {
            filter.func <- function(part) {
              Filter(f, part)
            }
            lapplyPartition(x, filter.func)
          })

# @rdname filterRDD
# @aliases Filter
setMethod("Filter",
          signature(f = "function", x = "RDD"),
          function(f, x) {
            filterRDD(x, f)
          })

# Reduce across elements of an RDD.
#
# This function reduces the elements of this RDD using the
# specified commutative and associative binary operator.
#
# @param x The RDD to reduce
# @param func Commutative and associative function to apply on elements
#             of the RDD.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# reduce(rdd, "+") # 55
#}
# @rdname reduce
# @aliases reduce,RDD,ANY-method
setMethod("reduce",
          signature(x = "RDD", func = "ANY"),
          function(x, func) {

            reducePartition <- function(part) {
              Reduce(func, part)
            }

            partitionList <- collect(lapplyPartition(x, reducePartition),
                                     flatten = FALSE)
            Reduce(func, partitionList)
          })

# Get the maximum element of an RDD.
#
# @param x The RDD to get the maximum element from
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# maximum(rdd) # 10
#}
# @rdname maximum
# @aliases maximum,RDD
setMethod("maximum",
          signature(x = "RDD"),
          function(x) {
            reduce(x, max)
          })

# Get the minimum element of an RDD.
#
# @param x The RDD to get the minimum element from
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# minimum(rdd) # 1
#}
# @rdname minimum
# @aliases minimum,RDD
setMethod("minimum",
          signature(x = "RDD"),
          function(x) {
            reduce(x, min)
          })

# Add up the elements in an RDD.
#
# @param x The RDD to add up the elements in
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# sumRDD(rdd) # 55
#}
# @rdname sumRDD
# @aliases sumRDD,RDD
setMethod("sumRDD",
          signature(x = "RDD"),
          function(x) {
            reduce(x, "+")
          })

# Applies a function to all elements in an RDD, and force evaluation.
#
# @param x The RDD to apply the function
# @param func The function to be applied.
# @return invisible NULL.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# foreach(rdd, function(x) { save(x, file=...) })
#}
# @rdname foreach
# @aliases foreach,RDD,function-method
setMethod("foreach",
          signature(x = "RDD", func = "function"),
          function(x, func) {
            partition.func <- function(x) {
              lapply(x, func)
              NULL
            }
            invisible(collect(mapPartitions(x, partition.func)))
          })

# Applies a function to each partition in an RDD, and force evaluation.
#
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# foreachPartition(rdd, function(part) { save(part, file=...); NULL })
#}
# @rdname foreach
# @aliases foreachPartition,RDD,function-method
setMethod("foreachPartition",
          signature(x = "RDD", func = "function"),
          function(x, func) {
            invisible(collect(mapPartitions(x, func)))
          })

# Take elements from an RDD.
#
# This function takes the first NUM elements in the RDD and
# returns them in a list.
#
# @param x The RDD to take elements from
# @param num Number of elements to take
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# take(rdd, 2L) # list(1, 2)
#}
# @rdname take
# @aliases take,RDD,numeric-method
setMethod("take",
          signature(x = "RDD", num = "numeric"),
          function(x, num) {
            resList <- list()
            index <- -1
            jrdd <- getJRDD(x)
            numPartitions <- numPartitions(x)
            serializedModeRDD <- getSerializedMode(x)

            # TODO(shivaram): Collect more than one partition based on size
            # estimates similar to the scala version of `take`.
            while (TRUE) {
              index <- index + 1

              if (length(resList) >= num || index >= numPartitions)
                break

              # a JList of byte arrays
              partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
              partition <- partitionArr[[1]]

              size <- num - length(resList)
              # elems is capped to have at most `size` elements
              elems <- convertJListToRList(partition,
                                           flatten = TRUE,
                                           logicalUpperBound = size,
                                           serializedMode = serializedModeRDD)

              resList <- append(resList, elems)
            }
            resList
          })


# First
#
# Return the first element of an RDD
#
# @rdname first
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# first(rdd)
# }
setMethod("first",
          signature(x = "RDD"),
          function(x) {
            take(x, 1)[[1]]
          })

# Removes the duplicates from RDD.
#
# This function returns a new RDD containing the distinct elements in the
# given RDD. The same as `distinct()' in Spark.
#
# @param x The RDD to remove duplicates from.
# @param numPartitions Number of partitions to create.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, c(1,2,2,3,3,3))
# sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
#}
# @rdname distinct
# @aliases distinct,RDD-method
setMethod("distinct",
          signature(x = "RDD"),
          function(x, numPartitions = SparkR:::numPartitions(x)) {
            identical.mapped <- lapply(x, function(x) { list(x, NULL) })
            reduced <- reduceByKey(identical.mapped,
                                   function(x, y) { x },
                                   numPartitions)
            resRDD <- lapply(reduced, function(x) { x[[1]] })
            resRDD
          })

# Return an RDD that is a sampled subset of the given RDD.
#
# The same as `sample()' in Spark. (We rename it due to signature
# inconsistencies with the `sample()' function in R's base package.)
#
# @param x The RDD to sample elements from
# @param withReplacement Sampling with replacement or not
# @param fraction The (rough) sample target fraction
# @param seed Randomness seed value
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
# collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#}
# @rdname sampleRDD
# @aliases sampleRDD,RDD
setMethod("sampleRDD",
          signature(x = "RDD", withReplacement = "logical",
                    fraction = "numeric", seed = "integer"),
          function(x, withReplacement, fraction, seed) {

            # The sampler: takes a partition and returns its sampled version.
            samplingFunc <- function(partIndex, part) {
              set.seed(seed)
              res <- vector("list", length(part))
              len <- 0

              # Discards some random values to ensure each partition has a
              # different random seed.
              runif(partIndex)

              for (elem in part) {
                if (withReplacement) {
                  count <- rpois(1, fraction)
                  if (count > 0) {
                    res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
                    len <- len + count
                  }
                } else {
                  if (runif(1) < fraction) {
                    len <- len + 1
                    res[[len]] <- elem
                  }
                }
              }

              # TODO(zongheng): look into the performance of the current
              # implementation. Look into some iterator package? Note that
              # Scala avoids many calls to creating an empty list and PySpark
              # similarly achieves this using `yield'.
              if (len > 0)
                res[1:len]
              else
                list()
            }

            lapplyPartitionsWithIndex(x, samplingFunc)
          })

# Return a list of the elements that are a sampled subset of the given RDD.
#
# @param x The RDD to sample elements from
# @param withReplacement Sampling with replacement or not
# @param num Number of elements to return
# @param seed Randomness seed value
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:100)
# # exactly 5 elements sampled, which may not be distinct
# takeSample(rdd, TRUE, 5L, 1618L)
# # exactly 5 distinct elements sampled
# takeSample(rdd, FALSE, 5L, 16181618L)
#}
# @rdname takeSample
# @aliases takeSample,RDD
setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
                                  num = "integer", seed = "integer"),
          function(x, withReplacement, num, seed) {
            # This function is ported from RDD.scala.
            fraction <- 0.0
            total <- 0
            multiplier <- 3.0
            initialCount <- count(x)
            maxSelected <- 0
            MAXINT <- .Machine$integer.max

            if (num < 0)
              stop(paste("Negative number of elements requested"))

            if (initialCount > MAXINT - 1) {
              maxSelected <- MAXINT - 1
            } else {
              maxSelected <- initialCount
            }

            if (num > initialCount && !withReplacement) {
              total <- maxSelected
              fraction <- multiplier * (maxSelected + 1) / initialCount
            } else {
              total <- num
              fraction <- multiplier * (num + 1) / initialCount
            }

            set.seed(seed)
            samples <- collect(sampleRDD(x, withReplacement, fraction,
                                         as.integer(ceiling(runif(1,
                                                                  -MAXINT,
                                                                  MAXINT)))))
            # If the first sample didn't turn out large enough, keep trying to
            # take samples; this shouldn't happen often because we use a big
            # multiplier for thei initial size
            while (length(samples) < total)
              samples <- collect(sampleRDD(x, withReplacement, fraction,
                                           as.integer(ceiling(runif(1,
                                                                    -MAXINT,
                                                                    MAXINT)))))

            # TODO(zongheng): investigate if this call is an in-place shuffle?
            base::sample(samples)[1:total]
          })

# Creates tuples of the elements in this RDD by applying a function.
#
# @param x The RDD.
# @param func The function to be applied.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1, 2, 3))
# collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
#}
# @rdname keyBy
# @aliases keyBy,RDD
setMethod("keyBy",
          signature(x = "RDD", func = "function"),
          function(x, func) {
            apply.func <- function(x) {
              list(func(x), x)
            }
            lapply(x, apply.func)
          })

# Return a new RDD that has exactly numPartitions partitions.
# Can increase or decrease the level of parallelism in this RDD. Internally,
# this uses a shuffle to redistribute data.
# If you are decreasing the number of partitions in this RDD, consider using
# coalesce, which can avoid performing a shuffle.
#
# @param x The RDD.
# @param numPartitions Number of partitions to create.
# @seealso coalesce
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
# numPartitions(rdd)                   # 4
# numPartitions(repartition(rdd, 2L))  # 2
#}
# @rdname repartition
# @aliases repartition,RDD
setMethod("repartition",
          signature(x = "RDD", numPartitions = "numeric"),
          function(x, numPartitions) {
            coalesce(x, numPartitions, TRUE)
          })

# Return a new RDD that is reduced into numPartitions partitions.
#
# @param x The RDD.
# @param numPartitions Number of partitions to create.
# @seealso repartition
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
# numPartitions(rdd)               # 3
# numPartitions(coalesce(rdd, 1L)) # 1
#}
# @rdname coalesce
# @aliases coalesce,RDD
setMethod("coalesce",
           signature(x = "RDD", numPartitions = "numeric"),
           function(x, numPartitions, shuffle = FALSE) {
             numPartitions <- numToInt(numPartitions)
             if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
               func <- function(partIndex, part) {
                 set.seed(partIndex)  # partIndex as seed
                 start <- as.integer(base::sample(numPartitions, 1) - 1)
                 lapply(seq_along(part),
                        function(i) {
                          pos <- (start + i) %% numPartitions
                          list(pos, part[[i]])
                        })
               }
               shuffled <- lapplyPartitionsWithIndex(x, func)
               repartitioned <- partitionBy(shuffled, numPartitions)
               values(repartitioned)
             } else {
               jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
               RDD(jrdd)
             }
           })

# Save this RDD as a SequenceFile of serialized objects.
#
# @param x The RDD to save
# @param path The directory where the file is saved
# @seealso objectFile
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:3)
# saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
#}
# @rdname saveAsObjectFile
# @aliases saveAsObjectFile,RDD
setMethod("saveAsObjectFile",
          signature(x = "RDD", path = "character"),
          function(x, path) {
            # If serializedMode == "string" we need to serialize the data before saving it since
            # objectFile() assumes serializedMode == "byte".
            if (getSerializedMode(x) != "byte") {
              x <- serializeToBytes(x)
            }
            # Return nothing
            invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
          })

# Save this RDD as a text file, using string representations of elements.
#
# @param x The RDD to save
# @param path The directory where the partitions of the text file are saved
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:3)
# saveAsTextFile(rdd, "/tmp/sparkR-tmp")
#}
# @rdname saveAsTextFile
# @aliases saveAsTextFile,RDD
setMethod("saveAsTextFile",
          signature(x = "RDD", path = "character"),
          function(x, path) {
            func <- function(str) {
              toString(str)
            }
            stringRdd <- lapply(x, func)
            # Return nothing
            invisible(
              callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
          })

# Sort an RDD by the given key function.
#
# @param x An RDD to be sorted.
# @param func A function used to compute the sort key for each element.
# @param ascending A flag to indicate whether the sorting is ascending or descending.
# @param numPartitions Number of partitions to create.
# @return An RDD where all elements are sorted.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(3, 2, 1))
# collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#}
# @rdname sortBy
# @aliases sortBy,RDD,RDD-method
setMethod("sortBy",
          signature(x = "RDD", func = "function"),
          function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
            values(sortByKey(keyBy(x, func), ascending, numPartitions))
          })

# Helper function to get first N elements from an RDD in the specified order.
# Param:
#   x An RDD.
#   num Number of elements to return.
#   ascending A flag to indicate whether the sorting is ascending or descending.
# Return:
#   A list of the first N elements from the RDD in the specified order.
#
takeOrderedElem <- function(x, num, ascending = TRUE) {
  if (num <= 0L) {
    return(list())
  }

  partitionFunc <- function(part) {
    if (num < length(part)) {
      # R limitation: order works only on primitive types!
      ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
      part[ord[1:num]]
    } else {
      part
    }
  }

  newRdd <- mapPartitions(x, partitionFunc)

  resList <- list()
  index <- -1
  jrdd <- getJRDD(newRdd)
  numPartitions <- numPartitions(newRdd)
  serializedModeRDD <- getSerializedMode(newRdd)

  while (TRUE) {
    index <- index + 1

    if (index >= numPartitions) {
      ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
      resList <- resList[ord[1:num]]
      break
    }

    # a JList of byte arrays
    partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
    partition <- partitionArr[[1]]

    # elems is capped to have at most `num` elements
    elems <- convertJListToRList(partition,
                                 flatten = TRUE,
                                 logicalUpperBound = num,
                                 serializedMode = serializedModeRDD)

    resList <- append(resList, elems)
  }
  resList
}

# Returns the first N elements from an RDD in ascending order.
#
# @param x An RDD.
# @param num Number of elements to return.
# @return The first N elements from the RDD in ascending order.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
# takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#}
# @rdname takeOrdered
# @aliases takeOrdered,RDD,RDD-method
setMethod("takeOrdered",
          signature(x = "RDD", num = "integer"),
          function(x, num) {
            takeOrderedElem(x, num)
          })

# Returns the top N elements from an RDD.
#
# @param x An RDD.
# @param num Number of elements to return.
# @return The top N elements from the RDD.
# @rdname top
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
# top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#}
# @rdname top
# @aliases top,RDD,RDD-method
setMethod("top",
          signature(x = "RDD", num = "integer"),
          function(x, num) {
            takeOrderedElem(x, num, FALSE)
          })

# Fold an RDD using a given associative function and a neutral "zero value".
#
# Aggregate the elements of each partition, and then the results for all the
# partitions, using a given associative function and a neutral "zero value".
#
# @param x An RDD.
# @param zeroValue A neutral "zero value".
# @param op An associative function for the folding operation.
# @return The folding result.
# @rdname fold
# @seealso reduce
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
# fold(rdd, 0, "+") # 15
#}
# @rdname fold
# @aliases fold,RDD,RDD-method
setMethod("fold",
          signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
          function(x, zeroValue, op) {
            aggregateRDD(x, zeroValue, op, op)
          })

# Aggregate an RDD using the given combine functions and a neutral "zero value".
#
# Aggregate the elements of each partition, and then the results for all the
# partitions, using given combine functions and a neutral "zero value".
#
# @param x An RDD.
# @param zeroValue A neutral "zero value".
# @param seqOp A function to aggregate the RDD elements. It may return a different
#              result type from the type of the RDD elements.
# @param combOp A function to aggregate results of seqOp.
# @return The aggregation result.
# @rdname aggregateRDD
# @seealso reduce
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1, 2, 3, 4))
# zeroValue <- list(0, 0)
# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
# aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#}
# @rdname aggregateRDD
# @aliases aggregateRDD,RDD,RDD-method
setMethod("aggregateRDD",
          signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
          function(x, zeroValue, seqOp, combOp) {
            partitionFunc <- function(part) {
              Reduce(seqOp, part, zeroValue)
            }

            partitionList <- collect(lapplyPartition(x, partitionFunc),
                                     flatten = FALSE)
            Reduce(combOp, partitionList, zeroValue)
          })

# Pipes elements to a forked external process.
#
# The same as 'pipe()' in Spark.
#
# @param x The RDD whose elements are piped to the forked external process.
# @param command The command to fork an external process.
# @param env A named list to set environment variables of the external process.
# @return A new RDD created by piping all elements to a forked external process.
# @rdname pipeRDD
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:10)
# collect(pipeRDD(rdd, "more")
# Output: c("1", "2", ..., "10")
#}
# @rdname pipeRDD
# @aliases pipeRDD,RDD,character-method
setMethod("pipeRDD",
          signature(x = "RDD", command = "character"),
          function(x, command, env = list()) {
            func <- function(part) {
              trim_trailing_func <- function(x) {
                sub("[\r\n]*$", "", toString(x))
              }
              input <- unlist(lapply(part, trim_trailing_func))
              res <- system2(command, stdout = TRUE, input = input, env = env)
              lapply(res, trim_trailing_func)
            }
            lapplyPartition(x, func)
          })

# TODO: Consider caching the name in the RDD's environment
# Return an RDD's name.
#
# @param x The RDD whose name is returned.
# @rdname name
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1,2,3))
# name(rdd) # NULL (if not set before)
#}
# @rdname name
# @aliases name,RDD
setMethod("name",
          signature(x = "RDD"),
          function(x) {
            callJMethod(getJRDD(x), "name")
          })

# Set an RDD's name.
#
# @param x The RDD whose name is to be set.
# @param name The RDD name to be set.
# @return a new RDD renamed.
# @rdname setName
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list(1,2,3))
# setName(rdd, "myRDD")
# name(rdd) # "myRDD"
#}
# @rdname setName
# @aliases setName,RDD
setMethod("setName",
          signature(x = "RDD", name = "character"),
          function(x, name) {
            callJMethod(getJRDD(x), "setName", name)
            x
          })

# Zip an RDD with generated unique Long IDs.
#
# Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
# n is the number of partitions. So there may exist gaps, but this
# method won't trigger a spark job, which is different from
# zipWithIndex.
#
# @param x An RDD to be zipped.
# @return An RDD with zipped items.
# @seealso zipWithIndex
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
# collect(zipWithUniqueId(rdd))
# # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
#}
# @rdname zipWithUniqueId
# @aliases zipWithUniqueId,RDD
setMethod("zipWithUniqueId",
          signature(x = "RDD"),
          function(x) {
            n <- numPartitions(x)

            partitionFunc <- function(partIndex, part) {
              mapply(
                function(item, index) {
                  list(item, (index - 1) * n + partIndex)
                },
                part,
                seq_along(part),
                SIMPLIFY = FALSE)
            }

            lapplyPartitionsWithIndex(x, partitionFunc)
          })

# Zip an RDD with its element indices.
#
# The ordering is first based on the partition index and then the
# ordering of items within each partition. So the first item in
# the first partition gets index 0, and the last item in the last
# partition receives the largest index.
#
# This method needs to trigger a Spark job when this RDD contains
# more than one partition.
#
# @param x An RDD to be zipped.
# @return An RDD with zipped items.
# @seealso zipWithUniqueId
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
# collect(zipWithIndex(rdd))
# # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
#}
# @rdname zipWithIndex
# @aliases zipWithIndex,RDD
setMethod("zipWithIndex",
          signature(x = "RDD"),
          function(x) {
            n <- numPartitions(x)
            if (n > 1) {
              nums <- collect(lapplyPartition(x,
                                              function(part) {
                                                list(length(part))
                                              }))
              startIndices <- Reduce("+", nums, accumulate = TRUE)
            }

            partitionFunc <- function(partIndex, part) {
              if (partIndex == 0) {
                startIndex <- 0
              } else {
                startIndex <- startIndices[[partIndex]]
              }

              mapply(
                function(item, index) {
                  list(item, index - 1 + startIndex)
                },
                part,
                seq_along(part),
                SIMPLIFY = FALSE)
           }

           lapplyPartitionsWithIndex(x, partitionFunc)
         })

# Coalesce all elements within each partition of an RDD into a list.
#
# @param x An RDD.
# @return An RDD created by coalescing all elements within
#         each partition into a list.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, as.list(1:4), 2L)
# collect(glom(rdd))
# # list(list(1, 2), list(3, 4))
#}
# @rdname glom
# @aliases glom,RDD
setMethod("glom",
          signature(x = "RDD"),
          function(x) {
            partitionFunc <- function(part) {
              list(part)
            }

            lapplyPartition(x, partitionFunc)
          })

############ Binary Functions #############

# Return the union RDD of two RDDs.
# The same as union() in Spark.
#
# @param x An RDD.
# @param y An RDD.
# @return a new RDD created by performing the simple union (witout removing
# duplicates) of two input RDDs.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:3)
# unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
#}
# @rdname unionRDD
# @aliases unionRDD,RDD,RDD-method
setMethod("unionRDD",
          signature(x = "RDD", y = "RDD"),
          function(x, y) {
            if (getSerializedMode(x) == getSerializedMode(y)) {
              jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
              union.rdd <- RDD(jrdd, getSerializedMode(x))
            } else {
              # One of the RDDs is not serialized, we need to serialize it first.
              if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
              if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
              jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
              union.rdd <- RDD(jrdd, "byte")
            }
            union.rdd
          })

# Zip an RDD with another RDD.
#
# Zips this RDD with another one, returning key-value pairs with the
# first element in each RDD second element in each RDD, etc. Assumes
# that the two RDDs have the same number of partitions and the same
# number of elements in each partition (e.g. one was made through
# a map on the other).
#
# @param x An RDD to be zipped.
# @param other Another RDD to be zipped.
# @return An RDD zipped from the two RDDs.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd1 <- parallelize(sc, 0:4)
# rdd2 <- parallelize(sc, 1000:1004)
# collect(zipRDD(rdd1, rdd2))
# # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
#}
# @rdname zipRDD
# @aliases zipRDD,RDD
setMethod("zipRDD",
          signature(x = "RDD", other = "RDD"),
          function(x, other) {
            n1 <- numPartitions(x)
            n2 <- numPartitions(other)
            if (n1 != n2) {
              stop("Can only zip RDDs which have the same number of partitions.")
            }

            rdds <- appendPartitionLengths(x, other)
            jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
            # The jrdd's elements are of scala Tuple2 type. The serialized
            # flag here is used for the elements inside the tuples.
            rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

            mergePartitions(rdd, TRUE)
          })

# Cartesian product of this RDD and another one.
#
# Return the Cartesian product of this RDD and another one,
# that is, the RDD of all pairs of elements (a, b) where a
# is in this and b is in other.
#
# @param x An RDD.
# @param other An RDD.
# @return A new RDD which is the Cartesian product of these two RDDs.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd <- parallelize(sc, 1:2)
# sortByKey(cartesian(rdd, rdd))
# # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#}
# @rdname cartesian
# @aliases cartesian,RDD,RDD-method
setMethod("cartesian",
          signature(x = "RDD", other = "RDD"),
          function(x, other) {
            rdds <- appendPartitionLengths(x, other)
            jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
            # The jrdd's elements are of scala Tuple2 type. The serialized
            # flag here is used for the elements inside the tuples.
            rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

            mergePartitions(rdd, FALSE)
          })

# Subtract an RDD with another RDD.
#
# Return an RDD with the elements from this that are not in other.
#
# @param x An RDD.
# @param other An RDD.
# @param numPartitions Number of the partitions in the result RDD.
# @return An RDD with the elements from this that are not in other.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
# rdd2 <- parallelize(sc, list(2, 4))
# collect(subtract(rdd1, rdd2))
# # list(1, 1, 3)
#}
# @rdname subtract
# @aliases subtract,RDD
setMethod("subtract",
          signature(x = "RDD", other = "RDD"),
          function(x, other, numPartitions = SparkR:::numPartitions(x)) {
            mapFunction <- function(e) { list(e, NA) }
            rdd1 <- map(x, mapFunction)
            rdd2 <- map(other, mapFunction)
            keys(subtractByKey(rdd1, rdd2, numPartitions))
          })

# Intersection of this RDD and another one.
#
# Return the intersection of this RDD and another one.
# The output will not contain any duplicate elements,
# even if the input RDDs did. Performs a hash partition
# across the cluster.
# Note that this method performs a shuffle internally.
#
# @param x An RDD.
# @param other An RDD.
# @param numPartitions The number of partitions in the result RDD.
# @return An RDD which is the intersection of these two RDDs.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
# rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
# collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
# # list(1, 2, 3)
#}
# @rdname intersection
# @aliases intersection,RDD
setMethod("intersection",
          signature(x = "RDD", other = "RDD"),
          function(x, other, numPartitions = SparkR:::numPartitions(x)) {
            rdd1 <- map(x, function(v) { list(v, NA) })
            rdd2 <- map(other, function(v) { list(v, NA) })

            filterFunction <- function(elem) {
              iters <- elem[[2]]
              all(as.vector(
                lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
            }

            keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
          })

# Zips an RDD's partitions with one (or more) RDD(s).
# Same as zipPartitions in Spark.
#
# @param ... RDDs to be zipped.
# @param func A function to transform zipped partitions.
# @return A new RDD by applying a function to the zipped partitions.
#         Assumes that all the RDDs have the *same number of partitions*, but
#         does *not* require them to have the same number of elements in each partition.
# @examples
#\dontrun{
# sc <- sparkR.init()
# rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
# rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
# rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
# collect(zipPartitions(rdd1, rdd2, rdd3,
#                       func = function(x, y, z) { list(list(x, y, z))} ))
# # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#}
# @rdname zipRDD
# @aliases zipPartitions,RDD
setMethod("zipPartitions",
          "RDD",
          function(..., func) {
            rrdds <- list(...)
            if (length(rrdds) == 1) {
              return(rrdds[[1]])
            }
            nPart <- sapply(rrdds, numPartitions)
            if (length(unique(nPart)) != 1) {
              stop("Can only zipPartitions RDDs which have the same number of partitions.")
            }

            rrdds <- lapply(rrdds, function(rdd) {
              mapPartitionsWithIndex(rdd, function(partIndex, part) {
                print(length(part))
                list(list(partIndex, part))
              })
            })
            union.rdd <- Reduce(unionRDD, rrdds)
            zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
            res <- mapPartitions(zipped.rdd, function(plist) {
              do.call(func, plist[[1]])
            })
            res
          })
